package com.jjzhk.common.handler;

import com.jjzhk.common.bean.RpcRequest;
import com.jjzhk.common.bean.RpcResponse;
import com.jjzhk.common.utils.ProtoStuffSerializationUtils;
import com.jjzhk.common.utils.ReflexUtils;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * RpcClientHandler
 *
 * @author : JJZHK
 * @date : 2016-08-14
 * @comments :
 **/
public class RpcClientHandler extends SimpleChannelInboundHandler<RpcResponse> {
    private Logger logger = LoggerFactory.getLogger(RpcClientHandler.class);
    private RpcResponse response;
    private String address;
    private int port;
    private String SerializationClass;

    public RpcClientHandler(String address, int port, String SerializationClass) {
        this.address = address;
        this.port = port;
        this.SerializationClass = SerializationClass;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcResponse response) throws Exception {
        this.response = response;
    }

    public RpcResponse send(RpcRequest request) throws Exception
    {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            // 创建并初始化 Netty 客户端 Bootstrap 对象
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group);
            bootstrap.channel(NioSocketChannel.class);

            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel channel) throws Exception {
                    Class<?> forSerializationClass = null;
                    if (StringUtils.isEmpty(SerializationClass))
                    {
                        forSerializationClass = ProtoStuffSerializationUtils.class;
                    }
                    else
                    {
                        forSerializationClass = ReflexUtils.forName(SerializationClass);
                    }
                    channel.pipeline().addLast("decoder", new RpcEncoder(RpcRequest.class, forSerializationClass));
                    channel.pipeline().addLast("encoder", new RpcDecoder(RpcResponse.class, forSerializationClass));
                    channel.pipeline().addLast(RpcClientHandler.this); // 处理 RPC 响应
                }
            });
            bootstrap.option(ChannelOption.TCP_NODELAY, true);
            // 连接 RPC 服务器
            ChannelFuture future = bootstrap.connect(address, port).sync();
            // 写入 RPC 请求数据并关闭连接
            Channel channel = future.channel();
            channel.writeAndFlush(request).sync();
            channel.closeFuture().sync();
            // 返回 RPC 响应对象
            return response;
        } finally {
            group.shutdownGracefully();
        }
    }
}
